package org.luo.lan.server.factory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.luo.lan.common.util.Pair;
import org.luo.lan.server.ServerApplication;
import org.luo.lan.server.config.ServerConfig;
import org.luo.lan.server.config.ServerInfo;
import org.luo.lan.server.enums.ProxyTypeEnum;
import org.luo.lan.server.initializers.TcpTunnelServerInitializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

@Slf4j
public class BridgeChannelFactory {
    private ServerApplication application;
    public static BridgeChannelFactory INSTANCE = new BridgeChannelFactory();
    //循环创建tcpTunnel端口标识，为false表示停止创建
    private volatile boolean flag = true;
    private BridgeChannelFactory(){}
    BlockingQueue<Pair<String,Channel>> channelQueue = new ArrayBlockingQueue(100);

    public void setApplication(ServerApplication application) {
        this.application = application;
    }

    private Map<String, NioSocketChannel> clientKeyChannelMap = new ConcurrentHashMap<>();
    private Map<String, Channel> tcpTunnelChannelMap = new ConcurrentHashMap<>();
    //Map中的key为host或端口，Map中的value为Pair,Pair中的key存放的是clientKey，value存放的是target
    private Map<String, Pair<String, String>> proxyKeyPairMap = new ConcurrentHashMap<>();

    public Pair<Boolean, String> createBridgeChannel(String clientKey, NioSocketChannel channel) {
        AtomicReference<Pair<Boolean, String>> atomicPair = new AtomicReference<>(new Pair<>(true,"创建成功"));
        ServerInfo.Client client = ServerConfig.getInstance().getClient(clientKey);
        if (client != null) {
            List<Pair<String,String>> tcpTunnelList=ProxyTypeEnum.TCP_TUNNEL.getProxyKeys(client).stream().filter(t->!proxyKeyPairMap.containsKey(t.getKey())).collect(Collectors.toList());
            if (!tcpTunnelList.isEmpty()) {
                //如果有需要启动的tcpTunnel端口，需要先启动
                CountDownLatch latch = new CountDownLatch(tcpTunnelList.size());
                for (int i = 0; i < tcpTunnelList.size() && flag; i++) {
                    Pair<String, String> p = tcpTunnelList.get(i);
                    if (!proxyKeyPairMap.containsKey(p.getKey())) {
                        application.bindServerPort(new TcpTunnelServerInitializer(), Integer.valueOf(p.getKey())).addListener((ChannelFuture future) -> {
                            if (future.isSuccess()) {
                                log.debug("tcpTunnel端口{}启动成功！",p.getKey());
                                channelQueue.add(new Pair(p.getKey(),future.channel()));
                            } else {
                                flag=false;//不要再继续启动后面的tcpTunnel端口了
                                String errorMsg = "tcpTunnel端口" + p.getKey() + "启动失败，原因：" + future.cause().getMessage();
                                log.error(errorMsg);
                                atomicPair.set(new Pair<>(false, errorMsg));
                            }
                            latch.countDown();
                        });
                    }
                }
                try {
                    latch.await(15,TimeUnit.SECONDS);
                    if (atomicPair.get().getKey()) {
                        while (channelQueue.size() > 0) {
                            Pair<String, Channel> channelPair = channelQueue.take();
                            tcpTunnelChannelMap.put(channelPair.getKey(), channelPair.getValue());
                        }
                        putKeys(client, clientKey,channel);
                    } else {
                        //关闭已启动的端口
                        while (channelQueue.size() > 0) {
                            channelQueue.take().getValue().close();
                        }
                    }
                } catch (InterruptedException e) {
                    while (channelQueue.size() > 0) {
                        try {
                            channelQueue.take().getValue().close();
                        } catch (InterruptedException e1) {
                            log.error("catch中take线程被中断，e={}",e);
                        }
                    }
                    log.error("线程被中断，e={}",e);
                }finally {
                    flag=true;
                    channelQueue.clear();
                }
            }else{
                putKeys(client,clientKey,channel);
            }
        }
        return atomicPair.get();
    }

    private void putKeys(ServerInfo.Client client ,String clientKey,NioSocketChannel channel){
        clientKeyChannelMap.put(clientKey, channel);
        for (ProxyTypeEnum typeEnum : ProxyTypeEnum.values()) {
            typeEnum.getProxyKeys(client).forEach((k->{
                if (!proxyKeyPairMap.containsKey(k.getKey())) {
                    proxyKeyPairMap.put(k.getKey(), new Pair(clientKey, k.getValue()));
                }
            }));
        }
    }

    public Pair<Boolean, String> isExist(String clientKey) {
        Pair<Boolean, String> pair;
        if (clientKeyChannelMap.containsKey(clientKey)) {
            pair = new Pair<>(true, String.format("客户端 %s 已启动", clientKey));
            return pair;
        }
        /*for (ProxyTypeEnum proxyTypeEnum : ProxyTypeEnum.values()) {
            for (Pair k : proxyTypeEnum.getProxyKeys(clientKey)) {
                if (proxyKeyPairMap.containsKey(k.getKey())) {
                    //如果key相同，进一步判断target是不是相同，如果target不同则不允许
                    Pair existPair = proxyKeyPairMap.get(k.getKey());
                    if (!existPair.getValue().equals(k.getValue())) {
                        pair = new Pair<>(true, String.format("客户端 %s 中的 %s 与已启动的有冲突！", clientKey, proxyTypeEnum.getDesc()));
                        return pair;
                    }
                }
            }
        }*/
        pair = new Pair<>(false, "");
        return pair;
    }

    public synchronized void removeBridgeChannel(String clientKey) {
        clientKeyChannelMap.remove(clientKey);
        List removeList = new ArrayList();
        proxyKeyPairMap.forEach((k, v) -> {
            if (clientKey.equals(v.getKey())) {
                removeList.add(k);
            }
        });
        removeList.stream().forEach(k -> {
            proxyKeyPairMap.remove(k);
        });
        log.debug("客户端{}移除成功！", clientKey);
    }

    public synchronized void closeTcpTunnelChannel(String clientKey) {
        ServerInfo.Client client = ServerConfig.getInstance().getClient(clientKey);
        if (client != null) {
            ProxyTypeEnum.TCP_TUNNEL.getProxyKeys(client).stream().forEach(k->{
                if (tcpTunnelChannelMap.containsKey(k.getKey())) {
                    tcpTunnelChannelMap.get(k.getKey()).close();
                    log.debug("tcpTunnel端口{}关闭成功！",k.getKey());
                }
            });
        }

    }

    public void closeBridgeChannel(String clientKey) {
        clientKeyChannelMap.forEach((k, v) -> {
            if (k.equals(clientKey) && v != null) {
                log.info("客户端{}关闭中...", clientKey);
                v.close();
            }
        });
    }

    public NioSocketChannel getChannel(String proxyKey) {
        Pair<String, String> pair = proxyKeyPairMap.get(proxyKey);
        if (pair == null) return null;
        return clientKeyChannelMap.get(pair.getKey());
    }

    public String getTarget(String proxyKey) {
        Pair<String, String> pair = proxyKeyPairMap.get(proxyKey);
        if (pair == null) return null;
        return pair.getValue();
    }

}
